using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Naruto.Subscribe.Extension;
using Naruto.Subscribe.Interface;
using Naruto.Subscribe.Object;
using Naruto.Subscribe.Provider.Kafka.Interface;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Naruto.Subscribe.Provider.Kafka.Internal
{
    /// <summary>
    /// kafka消费者的实现
    /// </summary>
    public class KafkaConsumerClient : IConsumerClient
    {
        /// <summary>
        /// 日志接口
        /// </summary>
        private readonly ILogger _logger;
        /// <summary>
        /// 消费者连接工厂
        /// </summary>
        private readonly IConsumerConnectionFactory _consumerConnectionFactory;
        /// <summary>
        /// 
        /// </summary>

        public event EventHandler<MessageModel> OnMessageReceived;

        /// <summary>
        /// 消费者
        /// </summary>
        private IConsumer<string, byte[]> _consumer;

        /// <summary>
        /// 
        /// </summary>
        /// <param name="logger"></param>
        public KafkaConsumerClient(ILogger logger, IConsumerConnectionFactory consumerConnectionFactory)
        {
            _logger = logger;
            _consumerConnectionFactory = consumerConnectionFactory;
        }
        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            Dispose(true);
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        /// <param name="isDispose"></param>
        protected virtual void Dispose(bool isDispose)
        {
            _consumer?.Dispose();
        }
        /// <summary>
        /// 
        /// </summary>
        /// <param name="subscribeNames"></param>
        /// <returns></returns>
        public Task SubscribeAsync(List<string> subscribeNames, CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();
            _logger.LogInformation("正在订阅：{names}", subscribeNames.ToJsonString());
            //获取消费者信息
            _consumer = _consumerConnectionFactory.Get();
            //订阅主题处理 自动分配分区
            _consumer.Subscribe(subscribeNames);
            return Task.CompletedTask;
        }

        /// <summary>
        /// 监听消息
        /// </summary>
        /// <returns></returns>
        public Task ListingAsync(CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();
            //轮询
            foreach (var consumeResult in Loop(cancellationToken))
            {
                try
                {
                    //处理消息
                    Handler(consumeResult);
                }
                catch (Exception ex)
                {
                    _logger.LogError("loop发生错误,{ex}", ex.Message);
                }
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 轮询
        /// </summary>
        /// <returns></returns>
        protected virtual IEnumerable<ConsumeResult<string, byte[]>> Loop(CancellationToken cancellationToken)
        {
            //轮询
            while (!cancellationToken.IsCancellationRequested)
            {
                //获取消费者数据
                var consumeResult = _consumer.Consume(cancellationToken);
                if (consumeResult == null | consumeResult.IsPartitionEOF || consumeResult.Message.Value == null)
                    continue;
                yield return consumeResult;
            }
        }
        /// <summary>
        /// 处理消息
        /// </summary>
        /// <param name="consumeResult"></param>
        /// <returns></returns>
        protected virtual void Handler(ConsumeResult<string, byte[]> consumeResult)
        {
            if (consumeResult != null)
            {
                //获取订阅的消息
                var subscribeMsg = consumeResult.Message.Value.ToStr();
                //转换对象信息
                var messageModel = subscribeMsg.ToDeserialized<MessageModel>();
                //处理消息头
                messageModel.Header = BuildMessageHeaders(consumeResult);
                OnMessageReceived?.Invoke(consumeResult, messageModel);
            }
        }
        /// <summary>
        /// 构建消息头
        /// </summary>
        /// <param name="consumeResult"></param>
        /// <returns></returns>
        protected virtual IDictionary<string, string> BuildMessageHeaders(ConsumeResult<string, byte[]> consumeResult)
        {
            if (consumeResult.Message.Headers == null || consumeResult.Message.Headers.Count <= 0)
            {
                return new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
            }
            var result = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
            foreach (var item in consumeResult.Message.Headers)
            {
                result.TryAdd(item.Key, item.GetValueBytes()?.ToStr());
            }
            return result;
        }

        /// <summary>
        /// 提交
        /// </summary>
        /// <param name="sender"></param>
        /// <returns></returns>
        public Task CommitAsync(object sender)
        {
            var consumerResult = (ConsumeResult<string, byte[]>)sender;
            _consumer.Commit(consumerResult);
            return Task.CompletedTask;
        }


        /// <summary>
        /// 消息重置
        /// </summary>
        /// <param name="sender"></param>
        /// <returns></returns>
        public Task ResetAsync(object sender)
        {
            var consumerResult = (ConsumeResult<string, byte[]>)sender;
            //重置消息位点 为当前
            _consumer.Seek(new TopicPartitionOffset(consumerResult.TopicPartition, consumerResult.Offset));
            return Task.CompletedTask;
        }
    }
}
